package com.stars.distributed.schedule.util;

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 分布式线程池
 *
 * @author guoguifang
 */
public class DistributedThreadPoolExecutor {

    private final ReentrantLock mainLock = new ReentrantLock();

    private final ReentrantLock poolSizeLock = new ReentrantLock();

    private final Set<Worker> workers = new LinkedHashSet<>();

    private int largestPoolSize;

    private volatile long keepAliveTime;

    private volatile int corePoolSize;

    private volatile int maxPoolSize;

    private volatile ThreadFactory threadFactory;

    /**
     * 分布式线程池构造方法：当创建分布式线程池时默认预先创建corePoolSize个线程并将线程置为等待模式
     *
     * @param corePoolSize  核心线程数量
     * @param maxPoolSize   最大线程数量
     * @param keepAliveTime 非核心线程最大空闲时间
     * @param timeUnit 非核心线程最大空闲时间的时间单位
     */
    public DistributedThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit) {
        if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
        this.threadFactory = new DefaultDistributedThreadFactory();
        this.startAllCoreThreads();
    }

    /**
     * 获取当前可执行任务的线程数量，如果结果>0时不可修改核心线程数量及最大线程数量所以需对修改核心线程数量及最大线程数量加锁
     *
     * @return >=1->核心线程未满，1->核心线程已满、最大线程未满，0->最大线程已满
     */
    public int getExecutableCountAndTryLockPoolSize() {
        int executableCount = 0;
        final ReentrantLock poolSizeLock = this.poolSizeLock;
        poolSizeLock.lock();
        try {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker worker : this.workers) {
                    if (worker.task == null) {
                        executableCount++;
                        if (!worker.isCore()) {
                            return executableCount;
                        }
                    }
                }
                if (executableCount == 0 && this.workers.size() < this.maxPoolSize) {
                    executableCount++;
                }
                return executableCount;
            } finally {
                mainLock.unlock();
            }
        } finally {
            if (executableCount == 0) {
                poolSizeLock.unlock();
            }
        }
    }

    /**
     * 对修改核心线程数量及最大线程数量解锁
     */
    public void unlockPoolSize() {
        final ReentrantLock poolSizeLock = this.poolSizeLock;
        poolSizeLock.unlock();
    }

    /**
     * 执行给定任务，这个任务可能使用一个新线程执行也可能在已经存在的线程池中的线程执行
     */
    public boolean execute(Runnable task) {
        if (task == null) {
            return false;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : workers) {
                if (worker.task == null) {
                    return worker.setTask(task);
                }
            }
            return addWorker(task, false);
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 线程循环执行，当没有可执行任务时线程处于waiting状态并释放锁和CPU，当非核心线程超时时关闭线程
     */
    private void runWorker(Worker worker) {
        worker.unlock();
        while (true) {
            Runnable task = worker.task;
            if (task == null) {
                if (worker.isCore() || this.workers.size() <= this.maxPoolSize) {
                    worker.awaitNanos(keepAliveTime);
                }
                if (processNonCoreWorkerExit(worker)) {
                    break;
                }
                continue;
            }
            worker.lock();
            try {
                task.run();
            } catch (RuntimeException x) {
                throw x;
            } catch (Error x) {
                throw x;
            } catch (Throwable x) {
                throw new Error(x);
            } finally {
                worker.unlock();
            }
            worker.terminate();
        }
    }

    /**
     * 处理线程退出操作
     */
    private boolean processNonCoreWorkerExit(Worker worker) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (!worker.isCore()) {
                if (worker.isTimeout(keepAliveTime) || this.workers.size() > this.maxPoolSize) {
                    workers.remove(worker);
                    return true;
                }
            }
            return false;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 新增线程
     * @param task 线程执行任务
     * @param core 是否核心线程
     * @return 是否新增成功
     */
    private boolean addWorker(Runnable task, boolean core) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 先判断已创建线程数量是否已经超过核心线程/最大线程数量
            int workerCount = workers.size();
            int poolSize = core ? corePoolSize : maxPoolSize;
            if (workerCount >= poolSize) {
                return false;
            }
            Worker worker = new Worker(task, core);
            final Thread thread = worker.thread;
            if (thread != null) {
                thread.start();
                if (workers.add(worker)) {
                    if (workerCount + 1 > largestPoolSize) {
                        largestPoolSize = workerCount + 1;
                    }
                    return true;
                }
            }
            return false;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 当创建分布式线程池时默认预先创建corePoolSize个线程并将线程置为等待模式
     */
    private void startAllCoreThreads() {
        boolean flag = true;
        while (flag) {
            flag = addWorker(null, true);
        }
    }

    /**
     * 中断所有线程的等待
     */
    public void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : this.workers) {
                worker.interruptIfStarted();
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 打断空闲线程的等待
     * @param onlyOne 是否只打断一个线程
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker worker : workers) {
                if (worker.getIdleTimeNanos() > 0) {
                    worker.interruptIfStarted();
                }
                if (onlyOne) {
                    break;
                }
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 打断所有空闲线程的等待
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    /**
     * 得到线程工厂
     */
    private ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    /**
     * 设置分布式线程池核心线程数量
     * @param corePoolSize 核心线程数量
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0 || corePoolSize > this.maxPoolSize) {
            throw new IllegalArgumentException();
        }
        int diff = corePoolSize - this.corePoolSize;
        if (diff != 0) {
            final ReentrantLock poolSizeLock = this.poolSizeLock;
            poolSizeLock.lock();
            try {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    this.corePoolSize = corePoolSize;
                    int temp = 0;
                    for (Worker worker : workers) {
                        worker.setCore(temp++ < corePoolSize);
                    }
                    if (diff < 0) {
                        interruptIdleWorkers();
                    } else if (corePoolSize > this.workers.size()) {
                        startAllCoreThreads();
                    }
                } finally {
                    mainLock.unlock();
                }
            } finally {
                poolSizeLock.unlock();
            }
        }
    }

    /**
     * 设置分布式线程池最大线程数量
     * @param maxPoolSize 最大线程数量
     */
    public void setMaxPoolSize(int maxPoolSize) {
        if (maxPoolSize <= 0 || maxPoolSize < this.corePoolSize) {
            throw new IllegalArgumentException();
        }
        final ReentrantLock poolSizeLock = this.poolSizeLock;
        poolSizeLock.lock();
        try {
            this.maxPoolSize = maxPoolSize;
            if (this.workers.size() > maxPoolSize) {
                interruptIdleWorkers();
            }
        } finally {
            poolSizeLock.unlock();
        }
    }

    /**
     * 同时修改分布式线程池核心线程数量与最大线程数量
     * @param corePoolSize 核心线程数量
     * @param maxPoolSize 最大线程数量
     */
    public void setPoolSize(int corePoolSize, int maxPoolSize) {
        if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize) {
            throw new IllegalArgumentException();
        }
        if (corePoolSize > this.maxPoolSize) {
            setMaxPoolSize(maxPoolSize);
            setCorePoolSize(corePoolSize);
        } else {
            setCorePoolSize(corePoolSize);
            setMaxPoolSize(maxPoolSize);
        }
    }

    /**
     * 设置非核心线程最大空闲时间
     * @param keepAliveTime 非核心线程最大空闲时间
     * @param timeUnit 非核心线程最大空闲时间的时间单位
     */
    public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
        if (keepAliveTime < 0) {
            throw new IllegalArgumentException();
        }
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    /**
     * 获取所有的分布式线程池线程
     */
    public Set<Worker> getWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            Set<Worker> workers = new LinkedHashSet<>();
            workers.addAll(this.workers);
            return workers;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 分布式线程池工作线程类
     */
    public final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        private final ReentrantLock mainLock = new ReentrantLock();

        private final Condition termination = mainLock.newCondition();

        private final Thread thread;

        private Runnable task;

        private boolean core;

        private long idleStart;

        private Worker(Runnable task, boolean core) {
            setState(-1);
            this.thread = getThreadFactory().newThread(this);
            this.task = task;
            this.core = core;
            if (task == null) {
                this.idleStart = System.nanoTime();
            }
        }

        @Override
        public void run() {
            runWorker(this);
        }

        private boolean setTask(Runnable task) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (this.task == null && task != null) {
                    this.task = task;
                    this.idleStart = 0L;
                    interruptIfStarted();
                    return true;
                }
            } finally {
                mainLock.unlock();
            }
            return false;
        }

        private void terminate() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (this.task != null) {
                    this.task = null;
                    this.idleStart = System.nanoTime();
                }
            } finally {
                mainLock.unlock();
            }
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        @Override
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        private void lock() {
            acquire(1);
        }

        private boolean tryLock() {
            return tryAcquire(1);
        }

        private void unlock() {
            release(1);
        }

        private boolean isLocked() {
            return isHeldExclusively();
        }

        private void awaitNanos(long nanosKeepAliveTime) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (this.core) {
                    this.termination.await();
                } else {
                    long nanosTimeout = getNanosTimeout(nanosKeepAliveTime);
                    if (nanosTimeout > 0) {
                        this.termination.awaitNanos(nanosTimeout);
                    }
                }
            } catch (InterruptedException ignore) {
            } finally {
                mainLock.unlock();
            }
        }

        private long getNanosTimeout(long nanosKeepAliveTime) {
            if (this.idleStart == 0) {
                return 0;
            }
            return nanosKeepAliveTime + this.idleStart - System.nanoTime();
        }

        private boolean isTimeout(long nanosKeepAliveTime) {
            return this.idleStart != 0L && System.nanoTime() - this.idleStart > nanosKeepAliveTime;
        }

        private void interruptIfStarted() {
            if (getState() >= 0) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    this.termination.signalAll();
                } finally {
                    mainLock.unlock();
                }
            }
        }

        public String getName() {
            return this.thread.getName();
        }

        public Runnable getTask() {
            return this.task;
        }

        private void setCore(boolean core) {
            this.core = core;
        }

        public boolean isCore() {
            return this.core;
        }

        public long getIdleTimeNanos() {
            if (this.idleStart == 0) {
                return 0;
            }
            return System.nanoTime() - this.idleStart;
        }

        public long getIdleTimeMillis() {
            return TimeUnit.NANOSECONDS.toMillis(getIdleTimeNanos());
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof Worker) {
                Worker worker = (Worker) obj;
                if (worker.hashCode() == this.hashCode()) {
                    return true;
                }
            }
            return super.equals(obj);
        }

        @Override
        public int hashCode() {
            return super.hashCode();
        }
    }

    public static class WorkerInfo {

    }

    private static class DefaultDistributedThreadFactory implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        private DefaultDistributedThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "distributedThreadPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}